Coverage Report

Created: 2026-04-21 11:31

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
D:\a\scloud-dns\scloud-dns\src\workers\mod.rs
Line
Count
Source
1
use crate::exceptions::SCloudException;
2
use crate::workers::manager::StartGate;
3
use crate::workers::task::InFlightTask;
4
use crate::{log_error, log_info, log_sdebug, log_strace};
5
use anyhow::Result;
6
use serde::{Deserialize, Serialize};
7
use std::sync::Arc;
8
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering};
9
use tokio::sync::{Mutex, MutexGuard, Semaphore, mpsc};
10
11
pub(crate) mod manager;
12
pub(crate) mod queue;
13
pub(crate) mod reply_registry;
14
pub(crate) mod task;
15
pub(crate) mod tests;
16
pub(crate) mod types;
17
18
#[allow(non_camel_case_types)]
19
#[derive(Debug)]
20
pub(crate) struct SCloudWorker {
21
    // IDENTITY
22
    pub(crate) worker_id: AtomicU64,
23
    pub(crate) worker_type: AtomicU8,
24
25
    // CHANNEL
26
    pub(crate) dns_tx: Mutex<Vec<mpsc::Sender<InFlightTask>>>,
27
    pub(crate) dns_rx: Mutex<Vec<mpsc::Receiver<InFlightTask>>>,
28
29
    // RESOURCES/LIMITS
30
    pub(crate) stack_size_bytes: AtomicUsize,
31
    pub(crate) buffer_budget_bytes: AtomicUsize,
32
    pub(crate) max_stack_size_bytes: AtomicUsize,
33
    pub(crate) max_buffer_budget_bytes: AtomicUsize,
34
35
    // RUNTIME STATE
36
    pub(crate) state: AtomicU8,
37
    pub(crate) shutdown_requested: AtomicBool,
38
    pub(crate) shutdown_mode: AtomicU8,
39
40
    // BACKPRESSURE/IN-FLIGHT
41
    pub(crate) in_flight: AtomicUsize, // for metrics
42
    pub(crate) in_flight_sem: Arc<Semaphore>,
43
    pub(crate) max_in_flight: AtomicUsize,
44
45
    // METRICS
46
    pub(crate) jobs_done: AtomicU64,
47
    pub(crate) jobs_failed: AtomicU64,
48
    pub(crate) jobs_retried: AtomicU64,
49
50
    pub(crate) last_job_started_ms: AtomicU64,
51
    pub(crate) last_job_finished_ms: AtomicU64,
52
53
    pub(crate) last_error_code: AtomicU64,
54
    pub(crate) last_error_at_ms: AtomicU64,
55
56
    // CORRELATION/TRACING
57
    pub(crate) last_task_id_hi: AtomicU64, // 128-bit UUID split
58
    pub(crate) last_task_id_lo: AtomicU64,
59
}
60
61
impl SCloudWorker {
62
    const NEVER_APPLIED: u8 = 0xFF;
63
64
69
    pub(crate) fn new(worker_type: WorkerType) -> Result<Self, SCloudException> {
65
69
        Ok(Self {
66
69
            worker_id: AtomicU64::new(manager::generate_worker_id()),
67
69
            worker_type: AtomicU8::new(worker_type as u8),
68
69
            dns_tx: Mutex::new(Vec::new()),
69
69
            dns_rx: Mutex::new(Vec::new()),
70
69
            stack_size_bytes: AtomicUsize::new(2 * 1024 * 1024),
71
69
            buffer_budget_bytes: AtomicUsize::new(4 * 1024 * 1024),
72
69
            max_stack_size_bytes: AtomicUsize::new(32 * 1024 * 1024),
73
69
            max_buffer_budget_bytes: AtomicUsize::new(256 * 1024 * 1024),
74
69
            state: AtomicU8::new(WorkerState::INIT as u8),
75
69
            shutdown_requested: AtomicBool::new(false),
76
69
            shutdown_mode: AtomicU8::new(ShutdownMode::GRACEFUL as u8),
77
69
            in_flight: AtomicUsize::new(0),
78
69
            in_flight_sem: Arc::new(Semaphore::new(512)),
79
69
            max_in_flight: AtomicUsize::new(512),
80
69
            jobs_done: AtomicU64::new(0),
81
69
            jobs_failed: AtomicU64::new(0),
82
69
            jobs_retried: AtomicU64::new(0),
83
69
            last_job_started_ms: AtomicU64::new(0),
84
69
            last_job_finished_ms: AtomicU64::new(0),
85
69
            last_error_code: AtomicU64::new(0),
86
69
            last_error_at_ms: AtomicU64::new(0),
87
69
            last_task_id_hi: AtomicU64::new(0),
88
69
            last_task_id_lo: AtomicU64::new(0),
89
69
        })
90
69
    }
91
92
18
    pub async fn run(self: Arc<Self>, gate: Option<Arc<StartGate>>) -> Result<(), SCloudException> {
93
18
        log_sdebug!(
94
            "Running SCloudWorker [ID: {}][TYPE: {:?}]",
95
18
            self.get_worker_id(),
96
18
            self.get_worker_type()
97
        );
98
99
18
        if let Some(g) = gate {
100
18
            g.done().await;
101
0
        }
102
18
        match WorkerType::try_from(self.worker_type.load(Ordering::Relaxed)).unwrap() {
103
            WorkerType::LISTENER => {
104
2
                return Err(SCloudException::SCLOUD_WORKER_LISTENER_NO_SOCKET);
105
            }
106
            WorkerType::DECODER => {
107
2
                self.clone().set_state(WorkerState::IDLE);
108
2
                let (
rx0
,
tx0
) = self.get_dns_rx_tx().await?;
109
0
                types::decoder::run_dns_decoder(self.clone(), rx, tx).await?;
110
            }
111
            WorkerType::QUERY_DISPATCHER => {
112
2
                self.clone().set_state(WorkerState::IDLE);
113
2
                let (
rx0
,
tx0
) = self.get_dns_rx_tx().await?;
114
0
                types::query_dispatcher::run_dns_query_dispatcher(self.clone(), rx, tx).await?;
115
            }
116
            WorkerType::CACHE_LOOKUP => {
117
2
                self.clone().set_state(WorkerState::IDLE);
118
2
                let (
rx0
,
tx0
) = self.get_dns_rx_tx().await?;
119
0
                types::cache_lookup::run_dns_cache_lookup(self.clone(), rx, tx).await?;
120
            }
121
            WorkerType::ZONE_MANAGER => {
122
2
                self.clone().set_state(WorkerState::IDLE);
123
2
                let (
rx0
,
tx0
) = self.get_dns_rx_tx().await?;
124
0
                types::zone_manager::run_dns_zone_manager(self.clone(), rx, tx).await?;
125
            }
126
            WorkerType::RESOLVER => {
127
2
                self.clone().set_state(WorkerState::IDLE);
128
2
                let (
rx0
,
tx0
) = self.get_dns_rx_tx().await?;
129
0
                types::resolver::run_dns_resolver(self.clone(), rx, tx).await?;
130
            }
131
            WorkerType::CACHE_WRITER => {
132
2
                self.clone().set_state(WorkerState::IDLE);
133
2
                let (
rx0
,
tx0
) = self.get_dns_rx_tx().await?;
134
0
                types::cache_writer::run_dns_cache_writer(self.clone(), rx, tx).await?;
135
            }
136
            WorkerType::ENCODER => {
137
2
                self.clone().set_state(WorkerState::IDLE);
138
2
                let (
rx0
,
tx0
) = self.get_dns_rx_tx().await?;
139
0
                types::encoder::run_dns_encoder(self.clone(), rx, tx).await?;
140
            }
141
            WorkerType::SENDER => {
142
1
                self.clone().set_state(WorkerState::IDLE);
143
1
                let 
rx0
= self.get_dns_rx().await?;
144
0
                types::sender::run_dns_sender(self.clone(), rx).await?;
145
            }
146
            WorkerType::CACHE_JANITOR => {
147
0
                self.clone().set_state(WorkerState::IDLE);
148
0
                types::cache_janitor::run_dns_cache_janitor(self.clone()).await?;
149
            }
150
            WorkerType::METRICS => {
151
0
                self.clone().set_state(WorkerState::IDLE);
152
0
                types::metrics::start_otlp_logger().await;
153
            }
154
            WorkerType::TCP_ACCEPTOR => {
155
1
                self.clone().set_state(WorkerState::IDLE);
156
1
                let 
tx0
= self.get_dns_tx().await?;
157
0
                types::tcp_acceptor::run_dns_tcp_acceptor(self.clone(), tx).await?;
158
            }
159
            WorkerType::DOH_ACCEPTOR => {
160
0
                self.clone().set_state(WorkerState::IDLE);
161
0
                let tx = self.get_dns_tx().await?;
162
0
                types::doh_acceptor::run_dns_doh_acceptor(self.clone(), tx).await?;
163
            }
164
0
            _ => {}
165
        }
166
0
        Ok(())
167
18
    }
168
169
    #[inline]
170
41
    pub fn get_worker_id(&self) -> u64 {
171
41
        self.worker_id.load(Ordering::Relaxed)
172
41
    }
173
174
    #[inline]
175
50
    pub fn get_worker_type(&self) -> WorkerType {
176
50
        WorkerType::try_from(self.worker_type.load(Ordering::Relaxed)).unwrap()
177
50
    }
178
179
    #[inline]
180
0
    pub async fn push_dns_rx(&self, rx: mpsc::Receiver<InFlightTask>) {
181
0
        self.dns_rx.lock().await.push(rx);
182
0
    }
183
184
    #[inline]
185
0
    pub async fn push_dns_tx_many(&self, txs: Vec<mpsc::Sender<InFlightTask>>) {
186
0
        self.dns_tx.lock().await.extend(txs);
187
0
    }
188
189
    #[inline]
190
15
    pub async fn get_dns_rx_tx(
191
15
        &self,
192
15
    ) -> Result<
193
15
        (
194
15
            Vec<mpsc::Receiver<InFlightTask>>,
195
15
            Vec<mpsc::Sender<InFlightTask>>,
196
15
        ),
197
15
        SCloudException,
198
15
    > {
199
15
        Ok((self.get_dns_rx().await
?7
,
self8
.get_dns_tx().await
?7
))
200
15
    }
201
202
    #[inline]
203
10
    pub async fn get_dns_tx(&self) -> Result<Vec<mpsc::Sender<InFlightTask>>, SCloudException> {
204
10
        let mut guard = self.dns_tx.lock().await;
205
10
        if guard.is_empty() {
206
8
            return Err(SCloudException::SCLOUD_WORKER_TX_NOT_SET);
207
2
        }
208
2
        Ok(std::mem::take(&mut *guard))
209
10
    }
210
211
    #[inline]
212
17
    pub async fn get_dns_rx(&self) -> Result<Vec<mpsc::Receiver<InFlightTask>>, SCloudException> {
213
17
        let mut guard = self.dns_rx.lock().await;
214
17
        if guard.is_empty() {
215
8
            return Err(SCloudException::SCLOUD_WORKER_RX_NOT_SET);
216
9
        }
217
9
        Ok(std::mem::take(&mut *guard))
218
17
    }
219
220
    #[inline]
221
2
    pub fn get_stack_size_bytes(&self) -> usize {
222
2
        self.stack_size_bytes.load(Ordering::Relaxed)
223
2
    }
224
225
    #[inline]
226
2
    pub fn get_buffer_budget_bytes(&self) -> usize {
227
2
        self.buffer_budget_bytes.load(Ordering::Relaxed)
228
2
    }
229
230
    #[inline]
231
2
    pub fn get_max_stack_size_bytes(&self) -> usize {
232
2
        self.max_stack_size_bytes.load(Ordering::Relaxed)
233
2
    }
234
235
    #[inline]
236
2
    pub fn get_max_buffer_budget_bytes(&self) -> usize {
237
2
        self.max_buffer_budget_bytes.load(Ordering::Relaxed)
238
2
    }
239
240
    #[inline]
241
7
    pub fn get_state(&self) -> u8 {
242
7
        self.state.load(Ordering::Acquire)
243
7
    }
244
245
    #[inline]
246
2
    pub fn get_shutdown_requested(&self) -> bool {
247
2
        self.shutdown_requested.load(Ordering::Acquire)
248
2
    }
249
250
    #[inline]
251
3
    pub fn get_shutdown_mode(&self) -> u8 {
252
3
        self.shutdown_mode.load(Ordering::Acquire)
253
3
    }
254
255
    #[inline]
256
2
    pub fn get_in_flight(&self) -> usize {
257
2
        self.in_flight.load(Ordering::Relaxed)
258
2
    }
259
260
    #[inline]
261
1
    pub fn get_in_flight_sem(&self) -> usize {
262
1
        self.in_flight_sem.available_permits()
263
1
    }
264
265
    #[inline]
266
3
    pub fn get_max_in_flight(&self) -> usize {
267
3
        self.max_in_flight.load(Ordering::Relaxed)
268
3
    }
269
270
    #[inline]
271
2
    pub fn get_jobs_done(&self) -> u64 {
272
2
        self.jobs_done.load(Ordering::Relaxed)
273
2
    }
274
275
    #[inline]
276
2
    pub fn get_jobs_failed(&self) -> u64 {
277
2
        self.jobs_failed.load(Ordering::Relaxed)
278
2
    }
279
280
    #[inline]
281
2
    pub fn get_jobs_retried(&self) -> u64 {
282
2
        self.jobs_retried.load(Ordering::Relaxed)
283
2
    }
284
285
    #[inline]
286
2
    pub fn get_last_job_started_ms(&self) -> u64 {
287
2
        self.last_job_started_ms.load(Ordering::Relaxed)
288
2
    }
289
290
    #[inline]
291
2
    pub fn get_last_job_finished_ms(&self) -> u64 {
292
2
        self.last_job_finished_ms.load(Ordering::Relaxed)
293
2
    }
294
295
    #[inline]
296
2
    pub fn get_last_error_code(&self) -> u64 {
297
2
        self.last_error_code.load(Ordering::Relaxed)
298
2
    }
299
300
    #[inline]
301
2
    pub fn get_last_error_at_ms(&self) -> u64 {
302
2
        self.last_error_at_ms.load(Ordering::Relaxed)
303
2
    }
304
305
    #[inline]
306
2
    pub fn get_last_task_id_hi(&self) -> u64 {
307
2
        self.last_task_id_hi.load(Ordering::Relaxed)
308
2
    }
309
310
    #[inline]
311
2
    pub fn get_last_task_id_lo(&self) -> u64 {
312
2
        self.last_task_id_lo.load(Ordering::Relaxed)
313
2
    }
314
315
    #[inline]
316
1
    pub fn set_worker_id(&self, worker_id: u64) {
317
1
        self.worker_id.store(worker_id, Ordering::Relaxed);
318
1
    }
319
320
    #[inline]
321
13
    pub fn set_worker_type(&self, worker_type: WorkerType) {
322
13
        self.worker_type.store(worker_type as u8, Ordering::Relaxed);
323
13
    }
324
325
    #[inline]
326
1
    pub async fn set_dns_tx(&self, tx: mpsc::Sender<InFlightTask>) {
327
1
        self.dns_tx.lock().await.push(tx);
328
1
    }
329
330
    #[inline]
331
1
    pub async fn set_dns_rx(&self, rx: mpsc::Receiver<InFlightTask>) {
332
1
        self.dns_rx.lock().await.push(rx);
333
1
    }
334
335
    #[inline]
336
1
    pub fn set_stack_size_bytes(&self, stack_size_bytes: usize) {
337
1
        self.stack_size_bytes
338
1
            .store(stack_size_bytes, Ordering::Relaxed);
339
1
    }
340
341
    #[inline]
342
1
    pub fn set_buffer_budget_bytes(&self, buffer_budget_bytes: usize) {
343
1
        self.buffer_budget_bytes
344
1
            .store(buffer_budget_bytes, Ordering::Relaxed);
345
1
    }
346
347
    #[inline]
348
1
    pub fn set_max_stack_size_bytes(&self, max_stack_size_bytes: usize) {
349
1
        self.max_stack_size_bytes
350
1
            .store(max_stack_size_bytes, Ordering::Relaxed);
351
1
    }
352
353
    #[inline]
354
1
    pub fn set_max_buffer_budget_bytes(&self, max_buffer_budget_bytes: usize) {
355
1
        self.max_buffer_budget_bytes
356
1
            .store(max_buffer_budget_bytes, Ordering::Relaxed);
357
1
    }
358
359
    #[inline]
360
24
    pub fn set_state(&self, state: WorkerState) {
361
24
        self.state.store(state as u8, Ordering::Relaxed);
362
24
    }
363
364
    #[inline]
365
1
    pub fn set_shutdown_requested(&self, shutdown_requested: bool) {
366
1
        self.shutdown_requested
367
1
            .store(shutdown_requested, Ordering::Relaxed);
368
1
    }
369
370
    #[inline]
371
2
    pub fn set_shutdown_mode(&self, shutdown_mode: ShutdownMode) {
372
2
        self.shutdown_mode
373
2
            .store(shutdown_mode as u8, Ordering::Relaxed);
374
2
    }
375
376
    #[inline]
377
1
    pub fn set_in_flight(&self, in_flight: usize) {
378
1
        self.in_flight.store(in_flight, Ordering::Relaxed);
379
1
    }
380
381
    #[inline]
382
5
    pub fn set_max_in_flight(&self, max_in_flight: usize) {
383
5
        self.max_in_flight.store(max_in_flight, Ordering::Relaxed);
384
5
    }
385
386
    #[inline]
387
1
    pub fn set_jobs_done(&self, jobs_done: u64) {
388
1
        self.jobs_done.store(jobs_done, Ordering::Relaxed);
389
1
    }
390
391
    #[inline]
392
1
    pub fn set_jobs_failed(&self, jobs_failed: u64) {
393
1
        self.jobs_failed.store(jobs_failed, Ordering::Relaxed);
394
1
    }
395
396
    #[inline]
397
1
    pub fn set_jobs_retried(&self, jobs_retried: u64) {
398
1
        self.jobs_retried.store(jobs_retried, Ordering::Relaxed);
399
1
    }
400
401
    #[inline]
402
1
    pub fn set_last_job_started_ms(&self, last_job_started_ms: u64) {
403
1
        self.last_job_started_ms
404
1
            .store(last_job_started_ms, Ordering::Relaxed);
405
1
    }
406
407
    #[inline]
408
1
    pub fn set_last_job_finished_ms(&self, last_job_finished_ms: u64) {
409
1
        self.last_job_finished_ms
410
1
            .store(last_job_finished_ms, Ordering::Relaxed);
411
1
    }
412
413
    #[inline]
414
1
    pub fn set_last_error_code(&self, last_error_code: u64) {
415
1
        self.last_error_code
416
1
            .store(last_error_code, Ordering::Relaxed);
417
1
    }
418
419
    #[inline]
420
1
    pub fn set_last_error_at_ms(&self, last_error_at_ms: u64) {
421
1
        self.last_error_at_ms
422
1
            .store(last_error_at_ms, Ordering::Relaxed);
423
1
    }
424
425
    #[inline]
426
1
    pub fn set_last_task_id_hi(&self, last_task_id_hi: u64) {
427
1
        self.last_task_id_hi
428
1
            .store(last_task_id_hi, Ordering::Relaxed);
429
1
    }
430
431
    #[inline]
432
1
    pub fn set_last_task_id_lo(&self, last_task_id_lo: u64) {
433
1
        self.last_task_id_lo
434
1
            .store(last_task_id_lo, Ordering::Relaxed);
435
1
    }
436
}
437
438
#[repr(u8)]
439
#[allow(unused)]
440
#[allow(non_camel_case_types)]
441
#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, Eq)]
442
pub enum WorkerType {
443
    NONE = 99,
444
    LISTENER = 0,
445
    DECODER = 1,
446
    QUERY_DISPATCHER = 2,
447
    CACHE_LOOKUP = 3,
448
    ZONE_MANAGER = 4,
449
    RESOLVER = 5,
450
    CACHE_WRITER = 6,
451
    ENCODER = 7,
452
    SENDER = 8,
453
454
    CACHE_JANITOR = 9,
455
456
    METRICS = 10,
457
    TCP_ACCEPTOR = 11,
458
    DOH_ACCEPTOR = 12,
459
}
460
461
impl TryFrom<u8> for WorkerType {
462
    type Error = ();
463
464
69
    fn try_from(v: u8) -> Result<Self, Self::Error> {
465
69
        Ok(match v {
466
7
            0 => WorkerType::LISTENER,
467
7
            1 => WorkerType::DECODER,
468
7
            2 => WorkerType::QUERY_DISPATCHER,
469
7
            3 => WorkerType::CACHE_LOOKUP,
470
7
            4 => WorkerType::ZONE_MANAGER,
471
7
            5 => WorkerType::RESOLVER,
472
7
            6 => WorkerType::CACHE_WRITER,
473
7
            7 => WorkerType::ENCODER,
474
4
            8 => WorkerType::SENDER,
475
1
            9 => WorkerType::CACHE_JANITOR,
476
1
            10 => WorkerType::METRICS,
477
5
            11 => WorkerType::TCP_ACCEPTOR,
478
0
            12 => WorkerType::DOH_ACCEPTOR,
479
2
            99 => WorkerType::NONE,
480
            // TODO: return an SCloudException
481
0
            _ => return Err(()),
482
        })
483
69
    }
484
}
485
486
#[repr(u8)]
487
#[allow(unused)]
488
#[allow(non_camel_case_types)]
489
#[derive(Debug, PartialEq)]
490
pub(crate) enum WorkerState {
491
    INIT = 0,
492
    IDLE = 1,
493
    BUSY = 2,
494
    PAUSED = 3,
495
    STOPPING = 4,
496
    STOPPED = 5,
497
}
498
499
impl TryFrom<u8> for WorkerState {
500
    type Error = ();
501
502
6
    fn try_from(v: u8) -> Result<Self, Self::Error> {
503
6
        Ok(match v {
504
1
            0 => WorkerState::INIT,
505
1
            1 => WorkerState::IDLE,
506
1
            2 => WorkerState::BUSY,
507
1
            3 => WorkerState::PAUSED,
508
1
            4 => WorkerState::STOPPING,
509
1
            5 => WorkerState::STOPPED,
510
            // TODO: return an SCloudException
511
0
            _ => return Err(()),
512
        })
513
6
    }
514
}
515
516
#[repr(u8)]
517
#[allow(unused)]
518
#[allow(non_camel_case_types)]
519
#[derive(Debug, PartialEq)]
520
pub(crate) enum ShutdownMode {
521
    GRACEFUL = 0,
522
    IMMEDIATE = 1,
523
}
524
525
impl TryFrom<u8> for ShutdownMode {
526
    type Error = ();
527
528
2
    fn try_from(v: u8) -> Result<Self, Self::Error> {
529
2
        Ok(match v {
530
1
            0 => ShutdownMode::GRACEFUL,
531
1
            1 => ShutdownMode::IMMEDIATE,
532
            // TODO: return an SCloudException
533
0
            _ => return Err(()),
534
        })
535
2
    }
536
}
537
538
0
pub fn spawn_worker(
539
0
    worker: Arc<SCloudWorker>,
540
0
    gate: Arc<StartGate>,
541
0
) -> tokio::task::JoinHandle<()> {
542
0
    tokio::spawn(async move {
543
0
        gate.wait_turn(worker.get_worker_id()).await;
544
545
0
        if let Err(e) = worker.clone().run(Some(gate.clone())).await {
546
0
            log_error!("Worker {} failed: {:?}", worker.get_worker_id(), e);
547
0
        }
548
549
0
        gate.done().await;
550
0
    })
551
0
}